dec70940e91a8308c0c842d67275e1e9c8ce382b,PolygonRDD.java,PolygonRDD,SpatialJoinQuery,#PolygonRDD#number#number#number#,80

Before Change


		JavaPairRDD<Integer,Polygon> TargetSetWithID=this.polygonRDD.mapPartitionsToPair(new PartitionAssignGridPolygon(GridNumberHorizontal,GridNumberVertical,gridHorizontalBorder,gridVerticalBorder));
		JavaPairRDD<Integer,Polygon> QueryAreaSetWithID=polygonRDD.getPolygonRDD().mapPartitionsToPair(new PartitionAssignGridPolygon(GridNumberHorizontal,GridNumberVertical,gridHorizontalBorder,gridVerticalBorder));
//Join two dataset
		JavaPairRDD<Integer, Tuple2<Iterable<Polygon>, Iterable<Polygon>>> jointSet=QueryAreaSetWithID.cogroup(TargetSetWithID);
//Calculate the relation between one point and one query area
		JavaPairRDD<Polygon,String> result=jointSet.flatMapToPair(new PairFlatMapFunction<Tuple2<Integer,Tuple2<Iterable<Polygon>, Iterable<Polygon>>>, Polygon,String>()
				{

			public Iterable<Tuple2<Polygon, String>> call(
					Tuple2<Integer, Tuple2<Iterable<Polygon>, Iterable<Polygon>>> t)
					throws Exception {
				ArrayList<Tuple2<Polygon, String>> QueryAreaAndPoint=new ArrayList();
				Iterator<Polygon> QueryAreaIterator=t._2()._1().iterator();
				
				while(QueryAreaIterator.hasNext())
				{
					Polygon currentQueryArea=QueryAreaIterator.next();
					String QueryArea="";
					Iterator<Polygon> TargetIterator=t._2()._2().iterator();
					while(TargetIterator.hasNext())
					{
						Polygon currentTarget=TargetIterator.next();
						if(condition==0){
						if(currentQueryArea.contains(currentTarget))
						{
							QueryArea=QueryArea+"|"+currentTarget.getCoordinates()+"|";
						}
						}
						else
						{
							if(currentQueryArea.intersects(currentTarget))
							{
								QueryArea=QueryArea+"|"+currentTarget.getCoordinates()+"|";
							}
						}
					}
					
					QueryAreaAndPoint.add(new Tuple2<Polygon, String>(currentQueryArea,QueryArea));
				}
				
				return QueryAreaAndPoint;
			}
	
		});
//Delete the duplicate result
		JavaPairRDD<Polygon,String> refinedResult=result.reduceByKey(new Function2<String,String,String>(){

			public String call(String v1, String v2) throws Exception {
				if(v1=="" && v2!="")
				{
					return v2;
				}
				else if(v1!="" && v2=="")
				{
					return v1;
				}
				else if(v1!="" && v2!="")
				{
					return v1+""+v2;
				}
				else
				{
					return "";
				}
			}});
		//Persist the result on HDFS
		//refinedResult.repartition(1).saveAsTextFile(OutputLocation);
		return refinedResult;

After Change


		JavaPairRDD<Integer,Polygon> TargetSetWithID=this.polygonRDD.mapPartitionsToPair(new PartitionAssignGridPolygon(GridNumberHorizontal,GridNumberVertical,gridHorizontalBorder,gridVerticalBorder));
		JavaPairRDD<Integer,Polygon> QueryAreaSetWithID=polygonRDD.getPolygonRDD().mapPartitionsToPair(new PartitionAssignGridPolygon(GridNumberHorizontal,GridNumberVertical,gridHorizontalBorder,gridVerticalBorder));
//Join two dataset
		JavaPairRDD<Integer, Tuple2<Iterable<Polygon>, Iterable<Polygon>>> jointSet=QueryAreaSetWithID.cogroup(TargetSetWithID).repartition((QueryAreaSetWithID.partitions().size()+TargetSetWithID.partitions().size())*2);
		//Calculate the relation between one point and one query area
				JavaPairRDD<Polygon,Polygon> queryResult=jointSet.flatMapToPair(new PairFlatMapFunction<Tuple2<Integer,Tuple2<Iterable<Polygon>, Iterable<Polygon>>>, Polygon,Polygon>()
						{

					public Iterable<Tuple2<Polygon, Polygon>> call(
							Tuple2<Integer, Tuple2<Iterable<Polygon>, Iterable<Polygon>>> t)
							throws Exception {
						ArrayList<Tuple2<Polygon, Polygon>> QueryAreaAndTarget=new ArrayList();
						Iterator<Polygon> QueryAreaIterator=t._2()._1().iterator();
						
						while(QueryAreaIterator.hasNext())
						{
							Polygon currentQueryArea=QueryAreaIterator.next();
							Iterator<Polygon> TargetIterator=t._2()._2().iterator();
							while(TargetIterator.hasNext())
							{
								Polygon currentTarget=TargetIterator.next();
								if(condition==0){
								if(currentQueryArea.contains(currentTarget))
								{
									QueryAreaAndTarget.add(new Tuple2<Polygon,Polygon>(currentQueryArea,currentTarget));
								}
								}
								else
								{
									if(currentQueryArea.intersects(currentTarget))
									{
										QueryAreaAndTarget.add(new Tuple2<Polygon,Polygon>(currentQueryArea,currentTarget));
									}
								}
							}
						}
						
						return QueryAreaAndTarget;
					}
			
				});
		//Delete the duplicate result
				JavaPairRDD<Polygon, Iterable<Polygon>> aggregatedResult=queryResult.groupByKey();
				JavaPairRDD<Polygon,String> refinedResult=aggregatedResult.mapToPair(new PairFunction<Tuple2<Polygon,Iterable<Polygon>>,Polygon,String>()
						{

							public Tuple2<Polygon, String> call(Tuple2<Polygon, Iterable<Polygon>> t)
									{
								Integer commaFlag=0;
								Iterator<Polygon> valueIterator=t._2().iterator();
								String result="";
								while(valueIterator.hasNext())
								{
									Polygon currentTarget=valueIterator.next();
									Coordinate[] polygonCoordinate=currentTarget.getCoordinates();
									Integer count=polygonCoordinate.length;
									String currentTargetString="";
									for(int i=0;i<count;i++)
									{
										currentTargetString=currentTargetString+polygonCoordinate[i].x+","+polygonCoordinate[i].y;
									}
									if(!result.contains(currentTargetString))
									{
										if(commaFlag==0)
										{
											result=result+currentTargetString;
											commaFlag=1;
										}
										else result=result+","+currentTargetString;
									}
								}
								
								return new Tuple2<Polygon, String>(t._1(),result);
							}
					
						});
				//Persist the result on HDFS
				return refinedResult;